//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include "src/core/ext/filters/deadline/deadline_filter.h"

#include <stdbool.h>
#include <string.h>

#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>

#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel_init.h"

//
// grpc_deadline_state
//

// Timer callback.
static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
                           grpc_error* error) {
  grpc_call_element* elem = arg;
  grpc_deadline_state* deadline_state = elem->call_data;
  if (error != GRPC_ERROR_CANCELLED) {
    grpc_call_element_signal_error(
        exec_ctx, elem,
        grpc_error_set_int(
            GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
            GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED));
  }
  GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
}

// Starts the deadline timer.
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
                                  grpc_call_element* elem,
                                  gpr_timespec deadline) {
  deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
  if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) {
    return;
  }
  grpc_deadline_state* deadline_state = elem->call_data;
  grpc_deadline_timer_state cur_state;
  grpc_closure* closure = NULL;
retry:
  cur_state =
      (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
  switch (cur_state) {
    case GRPC_DEADLINE_STATE_PENDING:
      // Note: We do not start the timer if there is already a timer
      return;
    case GRPC_DEADLINE_STATE_FINISHED:
      if (gpr_atm_rel_cas(&deadline_state->timer_state,
                          GRPC_DEADLINE_STATE_FINISHED,
                          GRPC_DEADLINE_STATE_PENDING)) {
        // If we've already created and destroyed a timer, we always create a
        // new closure: we have no other guarantee that the inlined closure is
        // not in use (it may hold a pending call to timer_callback)
        closure = GRPC_CLOSURE_CREATE(timer_callback, elem,
                                      grpc_schedule_on_exec_ctx);
      } else {
        goto retry;
      }
      break;
    case GRPC_DEADLINE_STATE_INITIAL:
      if (gpr_atm_rel_cas(&deadline_state->timer_state,
                          GRPC_DEADLINE_STATE_INITIAL,
                          GRPC_DEADLINE_STATE_PENDING)) {
        closure =
            GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
                              elem, grpc_schedule_on_exec_ctx);
      } else {
        goto retry;
      }
      break;
  }
  GPR_ASSERT(closure);
  GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
  grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
                  gpr_now(GPR_CLOCK_MONOTONIC));
}

// Cancels the deadline timer.
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
                                   grpc_deadline_state* deadline_state) {
  if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
                      GRPC_DEADLINE_STATE_FINISHED)) {
    grpc_timer_cancel(exec_ctx, &deadline_state->timer);
  } else {
    // timer was either in STATE_INITAL (nothing to cancel)
    // OR in STATE_FINISHED (again nothing to cancel)
  }
}

// Callback run when the call is complete.
static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
  grpc_deadline_state* deadline_state = arg;
  cancel_timer_if_needed(exec_ctx, deadline_state);
  // Invoke the next callback.
  GRPC_CLOSURE_RUN(exec_ctx, deadline_state->next_on_complete,
                   GRPC_ERROR_REF(error));
}

// Inject our own on_complete callback into op.
static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
                                  grpc_transport_stream_op_batch* op) {
  deadline_state->next_on_complete = op->on_complete;
  GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state,
                    grpc_schedule_on_exec_ctx);
  op->on_complete = &deadline_state->on_complete;
}

// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
  grpc_call_element* elem;
  gpr_timespec deadline;
  grpc_closure closure;
};
static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
                                   grpc_error* error) {
  struct start_timer_after_init_state* state = arg;
  start_timer_if_needed(exec_ctx, state->elem, state->deadline);
  gpr_free(state);
}

void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
                              grpc_call_stack* call_stack,
                              gpr_timespec deadline) {
  grpc_deadline_state* deadline_state = elem->call_data;
  deadline_state->call_stack = call_stack;
  // Deadline will always be infinite on servers, so the timer will only be
  // set on clients with a finite deadline.
  deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
  if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
    // When the deadline passes, we indicate the failure by sending down
    // an op with cancel_error set.  However, we can't send down any ops
    // until after the call stack is fully initialized.  If we start the
    // timer here, we have no guarantee that the timer won't pop before
    // call stack initialization is finished.  To avoid that problem, we
    // create a closure to start the timer, and we schedule that closure
    // to be run after call stack initialization is done.
    struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
    state->elem = elem;
    state->deadline = deadline;
    GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
                      grpc_schedule_on_exec_ctx);
    GRPC_CLOSURE_SCHED(exec_ctx, &state->closure, GRPC_ERROR_NONE);
  }
}

void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
                                 grpc_call_element* elem) {
  grpc_deadline_state* deadline_state = elem->call_data;
  cancel_timer_if_needed(exec_ctx, deadline_state);
}

void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
                               gpr_timespec new_deadline) {
  grpc_deadline_state* deadline_state = elem->call_data;
  cancel_timer_if_needed(exec_ctx, deadline_state);
  start_timer_if_needed(exec_ctx, elem, new_deadline);
}

void grpc_deadline_state_client_start_transport_stream_op_batch(
    grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
    grpc_transport_stream_op_batch* op) {
  grpc_deadline_state* deadline_state = elem->call_data;
  if (op->cancel_stream) {
    cancel_timer_if_needed(exec_ctx, deadline_state);
  } else {
    // Make sure we know when the call is complete, so that we can cancel
    // the timer.
    if (op->recv_trailing_metadata) {
      inject_on_complete_cb(deadline_state, op);
    }
  }
}

//
// filter code
//

// Constructor for channel_data.  Used for both client and server filters.
static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
                                     grpc_channel_element* elem,
                                     grpc_channel_element_args* args) {
  GPR_ASSERT(!args->is_last);
  return GRPC_ERROR_NONE;
}

// Destructor for channel_data.  Used for both client and server filters.
static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
                                 grpc_channel_element* elem) {}

// Call data used for both client and server filter.
typedef struct base_call_data {
  grpc_deadline_state deadline_state;
} base_call_data;

// Additional call data used only for the server filter.
typedef struct server_call_data {
  base_call_data base;  // Must be first.
  // The closure for receiving initial metadata.
  grpc_closure recv_initial_metadata_ready;
  // Received initial metadata batch.
  grpc_metadata_batch* recv_initial_metadata;
  // The original recv_initial_metadata_ready closure, which we chain to
  // after our own closure is invoked.
  grpc_closure* next_recv_initial_metadata_ready;
} server_call_data;

// Constructor for call_data.  Used for both client and server filters.
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
                                  grpc_call_element* elem,
                                  const grpc_call_element_args* args) {
  grpc_deadline_state_init(exec_ctx, elem, args->call_stack, args->deadline);
  return GRPC_ERROR_NONE;
}

// Destructor for call_data.  Used for both client and server filters.
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
                              const grpc_call_final_info* final_info,
                              grpc_closure* ignored) {
  grpc_deadline_state_destroy(exec_ctx, elem);
}

// Method for starting a call op for client filter.
static void client_start_transport_stream_op_batch(
    grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
    grpc_transport_stream_op_batch* op) {
  grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
                                                             op);
  // Chain to next filter.
  grpc_call_next_op(exec_ctx, elem, op);
}

// Callback for receiving initial metadata on the server.
static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
                                        grpc_error* error) {
  grpc_call_element* elem = arg;
  server_call_data* calld = elem->call_data;
  // Get deadline from metadata and start the timer if needed.
  start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline);
  // Invoke the next callback.
  calld->next_recv_initial_metadata_ready->cb(
      exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error);
}

// Method for starting a call op for server filter.
static void server_start_transport_stream_op_batch(
    grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
    grpc_transport_stream_op_batch* op) {
  server_call_data* calld = elem->call_data;
  if (op->cancel_stream) {
    cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
  } else {
    // If we're receiving initial metadata, we need to get the deadline
    // from the recv_initial_metadata_ready callback.  So we inject our
    // own callback into that hook.
    if (op->recv_initial_metadata) {
      calld->next_recv_initial_metadata_ready =
          op->payload->recv_initial_metadata.recv_initial_metadata_ready;
      calld->recv_initial_metadata =
          op->payload->recv_initial_metadata.recv_initial_metadata;
      GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
                        recv_initial_metadata_ready, elem,
                        grpc_schedule_on_exec_ctx);
      op->payload->recv_initial_metadata.recv_initial_metadata_ready =
          &calld->recv_initial_metadata_ready;
    }
    // Make sure we know when the call is complete, so that we can cancel
    // the timer.
    // Note that we trigger this on recv_trailing_metadata, even though
    // the client never sends trailing metadata, because this is the
    // hook that tells us when the call is complete on the server side.
    if (op->recv_trailing_metadata) {
      inject_on_complete_cb(&calld->base.deadline_state, op);
    }
  }
  // Chain to next filter.
  grpc_call_next_op(exec_ctx, elem, op);
}

const grpc_channel_filter grpc_client_deadline_filter = {
    client_start_transport_stream_op_batch,
    grpc_channel_next_op,
    sizeof(base_call_data),
    init_call_elem,
    grpc_call_stack_ignore_set_pollset_or_pollset_set,
    destroy_call_elem,
    0,  // sizeof(channel_data)
    init_channel_elem,
    destroy_channel_elem,
    grpc_call_next_get_peer,
    grpc_channel_next_get_info,
    "deadline",
};

const grpc_channel_filter grpc_server_deadline_filter = {
    server_start_transport_stream_op_batch,
    grpc_channel_next_op,
    sizeof(server_call_data),
    init_call_elem,
    grpc_call_stack_ignore_set_pollset_or_pollset_set,
    destroy_call_elem,
    0,  // sizeof(channel_data)
    init_channel_elem,
    destroy_channel_elem,
    grpc_call_next_get_peer,
    grpc_channel_next_get_info,
    "deadline",
};

bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) {
  return grpc_channel_arg_get_bool(
      grpc_channel_args_find(channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS),
      !grpc_channel_args_want_minimal_stack(channel_args));
}

static bool maybe_add_deadline_filter(grpc_exec_ctx* exec_ctx,
                                      grpc_channel_stack_builder* builder,
                                      void* arg) {
  return grpc_deadline_checking_enabled(
             grpc_channel_stack_builder_get_channel_arguments(builder))
             ? grpc_channel_stack_builder_prepend_filter(builder, arg, NULL,
                                                         NULL)
             : true;
}

void grpc_deadline_filter_init(void) {
  grpc_channel_init_register_stage(
      GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
      maybe_add_deadline_filter, (void*)&grpc_client_deadline_filter);
  grpc_channel_init_register_stage(
      GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
      maybe_add_deadline_filter, (void*)&grpc_server_deadline_filter);
}

void grpc_deadline_filter_shutdown(void) {}
